package top.hypnos.bigdata.filesystem;

import top.hypnos.bigdata.filesystem.command.CommandImpl;
import top.hypnos.bigdata.filesystem.command.CreateNodeCommandExecutor;
import top.hypnos.bigdata.filesystem.command.DeleteNodeCommandExecutor;
import top.hypnos.bigdata.filesystem.commons.Pair;
import top.hypnos.bigdata.filesystem.record.ObjectInputStream;
import top.hypnos.bigdata.filesystem.record.ObjectOutputStream;
import top.hypnos.bigdata.filesystem.record.Record;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 内存数据数
 */
public class MemDataTree implements Record {

    private final AtomicInteger txnIdGenerator = new AtomicInteger(0);

    private MemDataNode root;

    private int txnId = 0;

    private final BlockingQueue<Pair<Command, CompletableFuture<Integer>>> executeQueue = new ArrayBlockingQueue<>(1000);
    private final BlockingQueue<Pair<Command, CompletableFuture<Integer>>> binlogQueue = new ArrayBlockingQueue<>(1000);
    private final BlockingQueue<Pair<Command, CompletableFuture<Integer>>> commitQueue = new ArrayBlockingQueue<>(1000);

    public MemDataTree(BinLog binLog) {
        root = new MemDataNode(0, "", new byte[0]);

        // 启动线程预执行Command
        new Thread(() -> {
            while (true) {
                try {
                    final var pair = executeQueue.take();
                    pair.getA().preExecute(this);
                    binlogQueue.put(pair); // 放入binlog队列，等待写入binlog
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 启动线程记录BinLog，记录成功后放入执行队列
        new Thread(() -> {
            while (true) {
                try {
                    final var pair = binlogQueue.take();
                    binLog.append(pair.getA());
                    commitQueue.put(pair);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 启动线程记录BinLog，记录成功后放入执行队列
        new Thread(() -> {
            while (true) {
                try {
                    final var pair = commitQueue.take();
                    pair.getA().commit();
                    this.txnId = pair.getA().getTxnId();
                    pair.getB().complete(pair.getA().getTxnId());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

    }

    public MemDataNode getRoot() {
        return root;
    }

    public int getTxnId() {
        return txnId;
    }

    void setTxnId(final int txnId) {
        this.txnId = txnId;
    }

    public MemDataNode getNode(final String path) {
        final var paths = path.split("/");
        MemDataNode currentNode = root;
        for (final var p : paths) {
            if (p.equals("")) {
                continue;
            }
            currentNode = currentNode.getChildren().stream()
                    .filter(n -> n.getPath().equals(p))
                    .findFirst()
                    .orElse(null);
            if (currentNode == null) {
                break;
            }
        }
        return currentNode;
    }

    public int createNode(final String path, final String data) throws InterruptedException, ExecutionException {
        return createNode(path, data.getBytes(StandardCharsets.UTF_8));
    }

    public int createNode(final String path, final byte[] data) throws InterruptedException, ExecutionException {
        final var txnId = txnIdGenerator.incrementAndGet();
        final var command = new CommandImpl<>(txnId, path, data, CreateNodeCommandExecutor.class);
        return executeCommand(command).get();
    }

    public int deleteNode(final String path) throws InterruptedException, ExecutionException {
        final var txnId = txnIdGenerator.incrementAndGet();
        final var command = new CommandImpl<>(txnId, path, DeleteNodeCommandExecutor.class);
        return executeCommand(command).get();
    }

    @Override
    public void serialize(ObjectOutputStream output) throws IOException {
        output.writeInt(getTxnId());
        root.serialize(output);
    }

    @Override
    public void deserialize(ObjectInputStream input) throws IOException {
        setTxnId(input.readInt());
        root = new MemDataNode();
        root.deserialize(input);
    }

    private CompletableFuture<Integer> executeCommand(final Command command) throws InterruptedException {
        // TODO: 需保证入队列顺序与txnId保持一致，待处理
        final var future = new CompletableFuture<Integer>();
        executeQueue.put(Pair.of(command, future)); // 放入对接串行执行
        return future;
    }
}
